Skip to content

fix: handle cancelled gRPC streams in supervisors#216

Draft
adarsh0728 wants to merge 3 commits into
mainfrom
fix-guard
Draft

fix: handle cancelled gRPC streams in supervisors#216
adarsh0728 wants to merge 3 commits into
mainfrom
fix-guard

Conversation

@adarsh0728
Copy link
Copy Markdown
Member

@adarsh0728 adarsh0728 commented May 5, 2026

Summary

  • This PR fixes a stuck UDF container issue caused by writes to a gRPC response stream after the client/runtime had already cancelled or closed the call.
  • The affected supervisors now guard all response stream writes, distinguish inbound gRPC stream errors from child actor failures, and finish streams only after all in-flight child actors have drained.

What Changed

  • Added guarded response handling in mapper, map-streamer, and source-transformer supervisors.

    • onNext, onError, and onCompleted are now protected so closed/cancelled gRPC calls do not throw back into Akka supervision.
    • This prevents call already closed from triggering supervisor restart/preRestart loops.
  • Added a shared InputStreamError actor message.

    • Service.onError() now sends InputStreamError instead of wrapping the throwable as an Exception.
    • This keeps inbound request-stream failures separate from child actor UDF failures.
    • As a result, supervisors no longer decrement active child counts for client cancellation/network errors.
  • Added drain-based completion through inputCompleted and finishIfDrained().

    • EOF no longer immediately calls responseObserver.onCompleted().
    • The supervisor waits until all started child actors have replied or failed.
    • If a UDF error occurred, the shutdown signal is completed once active work drains.
    • Otherwise, normal completion is sent after active work drains.
  • Updated map-streamer EOF handling.

    • Service.onCompleted() now routes EOF through the supervisor instead of completing the response stream directly.
    • This aligns map-streamer behaviour with mapper and source-transformer.
  • [Tests] Preserved production shutdown behaviour while making tests safe.

    • Mapper/source-transformer production servers still call System.exit(0) on failure.
    • Test constructors disable JVM exit so expected failure-path tests do not put the Maven test JVM into shutdown.

Why

  • Previously, EOF or client cancellation could close the response stream while child actors were still processing. Later child responses could then attempt onNext() on a closed gRPC call, producing errors like call already closed.

  • Also, user exceptions did not always complete shutdownSignal; shutdown could depend on a later request arriving. If no later request arrived, the UDF container could remain stuck.

Issue observed in live Pipeline

udf container was stuck after this error

Caused by: java.lang.IllegalStateException: call already closedat java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)at akka.dispatch.Mailbox.run(Mailbox.scala:230)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:516)at akka.actor.ActorCell.faultRecreate(ActorCell.scala:410)at akka.actor.dungeon.FaultHandling.faultRecreate$(FaultHandling.scala:36)at akka.actor.dungeon.FaultHandling.faultRecreate(FaultHandling.scala:98)at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)at akka.actor.dungeon.FaultHandling$$anon$1.applyOrElse(FaultHandling.scala:336)at akka.actor.dungeon.FaultHandling$$anon$1.applyOrElse(FaultHandling.scala:341)at scala.runtime.function.JProcedure1.apply(JProcedure1.java:10)at scala.runtime.function.JProcedure1.apply(JProcedure1.java:15)at akka.actor.dungeon.FaultHandling.$anonfun$1(FaultHandling.scala:96)at akka.actor.PreRestartException$.apply(Actor.scala:214)akka.actor.PreRestartException: akka://mapper/user/$c: exception in preRestart(class io.grpc.StatusRuntimeException, class io.numaproj.numaflow.map.v1.MapOuterClass$MapResponse)[ERROR] [04/27/2026 10:17:25.177] [mapper-akka.actor.default-dispatcher-372] [akka://mapper/user/$c] call already closed[WARN] [04/27/2026 10:17:25.176] [mapper-akka.actor.default-dispatcher-372] [akka.actor.ActorSystemImpl(mapper)] supervisor pre restart was executed due to: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception[INFO] [akkaDeadLetter][04/27/2026 10:17:25.176] [mapper-akka.actor.default-dispatcher-377] [akka://mapper/user/$c/$e] Message [akka.dispatch.sysmsg.Suspend] from Actor[akka://mapper/user/$c/$e#-865804973] to Actor[akka://mapper/user/$c/$e#-865804973] was not delivered. [7] dead letters encountered. If this is not an expected behavior then Actor[akka://mapper/user/$c/$e#-865804973] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.actor.ActorCell.invoke(ActorCell.scala:547)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:219)at akka.actor.Actor.aroundReceive$(Actor.scala:471)at akka.actor.Actor.aroundReceive(Actor.scala:537)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:270)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:269)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:213)at scala.PartialFunction.applyOrElse(PartialFunction.scala:214)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at io.numaproj.numaflow.mapper.MapSupervisorActor.sendResponse(MapSupervisorActor.java:118)at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366)at io.grpc.Status.asRuntimeException(Status.java:525)io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception[ERROR] [04/27/2026 10:17:25.176] [mapper-akka.actor.internal-dispatcher-380] [akka://mapper/user/$c] CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception... 12 moreat akka.actor.dungeon.FaultHandling.faultRecreate(FaultHandling.scala:94)

Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@KeranYang KeranYang self-requested a review May 5, 2026 13:13
Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

❌ Patch coverage is 73.09942% with 46 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@0c3b01c). Learn more about missing BASE report.

Files with missing lines Patch % Lines
...numaflow/mapstreamer/MapStreamSupervisorActor.java 72.72% 12 Missing and 3 partials ⚠️
...o/numaproj/numaflow/mapper/MapSupervisorActor.java 78.00% 8 Missing and 3 partials ⚠️
...ow/sourcetransformer/TransformSupervisorActor.java 78.00% 8 Missing and 3 partials ⚠️
.../main/java/io/numaproj/numaflow/mapper/Server.java 25.00% 2 Missing and 1 partial ⚠️
...io/numaproj/numaflow/sourcetransformer/Server.java 25.00% 2 Missing and 1 partial ⚠️
...main/java/io/numaproj/numaflow/mapper/Service.java 0.00% 1 Missing ⚠️
...java/io/numaproj/numaflow/mapstreamer/Service.java 50.00% 1 Missing ⚠️
...o/numaproj/numaflow/sourcetransformer/Service.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #216   +/-   ##
=======================================
  Coverage        ?   62.69%           
  Complexity      ?      613           
=======================================
  Files           ?      156           
  Lines           ?     3707           
  Branches        ?      260           
=======================================
  Hits            ?     2324           
  Misses          ?     1203           
  Partials        ?      180           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant